package com.lianda.operator;

import com.lianda.model.UserAction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * 对KeyedStream按指定字段滚动聚合并输出每一次滚动聚合后的结果
 */
public class AggregateMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<UserAction> source = env.fromCollection(Arrays.asList(
                new UserAction("userID1", -1, "a", "productID3", 10),
                new UserAction("userID2", -1, "b", "", 10),
                new UserAction("userID1", -1, "c", "productID5", 30),
                new UserAction("userID1", -1, "d", "productID5", 40),
                new UserAction("userID1", -1, "e", "productID5", 50)
        ));

        KeyedStream<UserAction, String> keyedStream = source.keyBy(new KeySelector<UserAction, String>() {
            @Override
            public String getKey(UserAction userAction) throws Exception {
                return userAction.getUserId();
            }
        });

        //滚动求和
        //可以根据类的字段名进行聚合，必须该类是POJO类，即有无参构造函数和getter与setter
//        keyedStream.sum("price").print();

        //max滚动求最大值，输出可能不是该值对应记录对象
        //可看action的值
        keyedStream.max("price").print();

        //maxBy滚动求最大并输出最大值对应记录对象
        //可看action的值对比max和maxby
//        keyedStream.maxBy("price").print();

        env.execute();
    }
}
